-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add value_from_statisics
to AggregateUDFImpl, remove special case for min/max/count aggregate statistics
#12296
Conversation
b9262ec
to
f478344
Compare
datafusion/expr/src/udaf.rs
Outdated
@@ -262,6 +262,19 @@ impl AggregateUDF { | |||
self.inner.is_descending() | |||
} | |||
|
|||
/// Returns true if the function is min. Used by the optimizer | |||
pub fn is_min(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not we should do...
We need to understand the context and have a general function instead of a specialize name matching function.
default_value
is a good example, it is only used in count for now, but it could extend to any function if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayzhan211 looking to this comment from @alamb #11151 (comment) it might be that those "general functions" might not exists, will do some homework
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is really no "general context", I think it is fine to just leave them as it is. Having specific function name matching in Impl Trait doesn't make sense to me 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @jayzhan211 that it is important that these methods describe some property about the function rather than "what the function is"
What if we added a function like this:
/// Return the value of this aggregate function from statistics
///
/// If the value of this aggregate, can be determined using only the
/// statistics, return `Some(value)`, otherwise return `None` (the default)
///
/// # Arguments
/// * `statistics`: the statistics describing the input to this aggregate functions
/// * `args`: the arguments passed to the aggregate function
///
/// The value of some aggregate functions such as `COUNT`, `MIN` and `MAX`
/// can be determined using statistics, if known
///
fn value_from_stats(&self, statistics: &Statistics, arguments: &[Arc<PhysicalExpr>]) -> Option<ScalarValue> { None }
I think you could then implement this function for min / max and count (moving logic out of the aggregate statistics optimizer). It might need some other information (like schema for types, for example) but I think it would be pretty straight forward
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @edmondop and @jayzhan211 - I have a suggestion about how to proceed. Let me know if that makes sense
Sorry (again) for the delay
datafusion/expr/src/udaf.rs
Outdated
@@ -262,6 +262,19 @@ impl AggregateUDF { | |||
self.inner.is_descending() | |||
} | |||
|
|||
/// Returns true if the function is min. Used by the optimizer | |||
pub fn is_min(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @jayzhan211 that it is important that these methods describe some property about the function rather than "what the function is"
What if we added a function like this:
/// Return the value of this aggregate function from statistics
///
/// If the value of this aggregate, can be determined using only the
/// statistics, return `Some(value)`, otherwise return `None` (the default)
///
/// # Arguments
/// * `statistics`: the statistics describing the input to this aggregate functions
/// * `args`: the arguments passed to the aggregate function
///
/// The value of some aggregate functions such as `COUNT`, `MIN` and `MAX`
/// can be determined using statistics, if known
///
fn value_from_stats(&self, statistics: &Statistics, arguments: &[Arc<PhysicalExpr>]) -> Option<ScalarValue> { None }
I think you could then implement this function for min / max and count (moving logic out of the aggregate statistics optimizer). It might need some other information (like schema for types, for example) but I think it would be pretty straight forward
Marking as draft as I think this PR is no longer waiting on feedback. Please mark it as ready for review when it is ready for another look |
I tried to implement that solution, but the problem is now here that there is no real distinction between count and min_max. So the non distinct count doesn't get taken at line 61 but only at line https://github.com/apache/datafusion/pull/12296/files#diff-bca16ed42e39a82d942b706ad36b0d49502c12c9bc1c50e33ed443ce8c4d0437R65 We have two options imho:
|
} | ||
} | ||
None | ||
let value = agg_expr.fun().value_from_stats( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edmondop
We have agg_expr.is_distinct()
here, so you can differentiate distinct count and non-distinct case.
Maybe we can have StatisticsArgs
similar to AccumulatorArgs
.
pub struct StatisticsArgs<'a> {
statistics: &'a Statistics,
return_type: &'a DataType,
/// Whether the aggregate function is distinct.
///
/// ```sql
/// SELECT COUNT(DISTINCT column1) FROM t;
/// ```
pub is_distinct: bool,
/// The physical expression of arguments the aggregate function takes.
pub exprs: &'a [Arc<dyn PhysicalExpr>],
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the function for count would return None if the aggregate is distinct? It felt like leaking in the UDF logic required by physical optimiser
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It felt like leaking in the UDF logic required by physical optimiser
I think it makes sense, by default it has None, but we can also tune the function for optimizer 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to find a better name than value_from_stats
? I am not very clear why the distinct
for count is important if you are simply "getting the value" from stats, if the precision is Exact
It seems that distinct has specifically to do with the optimizer and not with "getting a value for the specific UDF from statistics"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optimization for count is mainly for count(*)
, since it doesn't make sense for distinct case, so we only care about non-distinct count
for value_from_stats
, but not distinct count
.
It seems that distinct has specifically to do with the optimizer and not with "getting a value for the specific UDF from statistics"
distinct
is for value_from_stats
to know what kind of function we have, either count
or distinct count
.
How is this PR coming @edmondop ? Do you think you'll be able to finish it in the near term ? If not, perhaps we can put up the issue as a good first issue for someone as it has a "pretty close" implementation that just needs some help finishing |
I will wrap up this weekend, it's so close you are right |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great @edmondop thank you
I think this is a really nice API and generalizing the optimization is 👍
@@ -93,6 +94,19 @@ impl fmt::Display for AggregateUDF { | |||
} | |||
} | |||
|
|||
pub struct StatisticsArgs<'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
I think it would also be great to add some documentation. Perhaps like this:
pub struct StatisticsArgs<'a> { | |
/// Arguments passed to [`AggregateUDFImpl::value_from_stats`] | |
pub struct StatisticsArgs<'a> { |
@@ -574,6 +595,10 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { | |||
fn is_descending(&self) -> Option<bool> { | |||
None | |||
} | |||
// Return the value of the current UDF from the statistics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Return the value of the current UDF from the statistics | |
/// Return the value of this UDF for the query if it can be determined entirely from | |
/// statistics and arguments. | |
/// | |
/// For example, if the minimum value of column `x` is known exactly in the statistics, | |
/// then `MIN(x)` can be replaced by that value, significantly improving query performance. |
@@ -291,6 +294,36 @@ impl AggregateUDFImpl for Count { | |||
fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> { | |||
Ok(ScalarValue::Int64(Some(0))) | |||
} | |||
|
|||
fn value_from_stats(&self, statistics_args: &StatisticsArgs) -> Option<ScalarValue> { | |||
if statistics_args.is_distinct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
value_from_statisics
to AggregateUDFImpl, remove special case for min/max/count aggregate statistics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks again so much @edmondop and @jayzhan211
|
Which issue does this PR close?
Closes #11151 .
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?